<?php
namespace App\Libs\wrapper;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class Rabbitmq{
/////////////////////////////////////////////// Rabbitmq
	
	//MQ的默认连接配置
	public $config = array(
		'host' => '127.0.0.1', //ip   127.0.0.1  47.244.228.158
		'port' => '5672',      //端口号
		'user' => 'root',     //用户  guest
		'password' => '123456', //密码  guest
		'vhost' => '/'         //虚拟host
	);
	
	public $connection;     //链接
	public $channel;        //信道
	
	public $exchange_name = '';     //交换机名
	public $queue_name = '';        //队列名
	public $route_key = '';         //路由键
	public $exchange_type = 'direct';    //交换机类型
	
	public $autoAck = false; //是否自动ack应答
	
	/*
	 *  初始化
	 * @param $exchange_name 交换机名称
	 * @param $queue_name 队列名称
	 * @param $route_key 路由键
	 * @param $exchange_type 交换机类型
	 * @param $config MQ配置
	 * */
	public function __construct($exchange_name, $queue_name, $route_key, $exchange_type='direct', $config=array())
	{
		$this->exchange_name = empty($exchange_name) ? '' : $exchange_name;
		$this->queue_name = empty($queue_name) ? '' : $queue_name;
		$this->route_key = empty($route_key) ? '' : $route_key;
		$this->exchange_type = empty($exchange_type) ? '' : 'direct';
		if(!empty($config))
		{
			$this->setConfig($config);
		}
		$this->createConnect();
	}
	
	/*
	 * 创建连接与信道
	 * */
	private function createConnect()
	{
		$host = $this->config['host'];
		$port = $this->config['port'];
		$user = $this->config['user'];
		$password = $this->config['password'];
		$vhost = $this->config['vhost'];
		if(empty($host) || empty($port) || empty($user) || empty($password))
		{
			throw new Exception('RabbitMQ的连接配置不正确');
		}
		
		//创建链接
		$this->connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
		//创建信道
		$this->channel = $this->connection->channel();
		
		//创建交换机、队列
		$this->create_exchange();
	}
	
	/*
	 * 创建交换机、队列
	 *
	 * */
	private function create_exchange()
	{
		//// 声明初始化交换机（exchange）
		$exhcange_name = $this->exchange_name;//交换机名称
		//设置交换机类型：direct（直连交换机）、fanout（扇形交换机）、headers（头交换机）、topic（主题交换机）
		$type = $this->exchange_type;
		$passive = false;//消极处理， 判断是否存在队列，存在则返回，不存在直接抛出
		$durable = true;//交换机是否开启持久化  true:服务器重启会保留下来Exchange。警告：仅设置此选项，不代表消息持久化。即不保证重启后消息还在
		$auto_delete = false;//通道关闭后是否删除队列   true:当已经没有消费者时，服务器是否可以删除该Exchange
		($this->channel)->exchange_declare($exhcange_name,  $type, false, $durable, $auto_delete);
		
		//// 声明队列（queue）
		$queue_name = $this->queue_name;//队列名称
		$passive = false;//消极处理， 判断是否存在队列，存在则返回，不存在直接抛出   true
		$durable = true;//是否开启队列持久化    true：在服务器重启时，能够存活
		$exclusive = false;// 队列是否可以被其他队列访问   是否为当前连接的专用队列，在连接断开后，会自动删除该队列
		$auto_delete = false;//通道关闭后是否删除队列
		($this->channel)->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete);
		
		// 交换机与队列绑定
		($this->channel)->queue_bind($this->queue_name,$this->exchange_name, $this->route_key);
	}
	
	/*
	 * 发送消息
	 * @param $data 消息内容
	 * */
	public function send_message($msg)
	{
		//创建消息$msg = new AMQPMessage($data,$properties)
		//#$data  string类型 要发送的消息
		//#roperties array类型 设置的属性，比如设置该消息持久化[‘delivery_mode’=>2]
		
		$msg = json_encode($msg);
		
		//设置的属性
		$properties = array(
			'delivery_mode' => 2
		);
		$message = new AMQPMessage($msg, $properties);
		
		//// 发送消息
		($this->channel)->basic_publish($message, $this->exchange_name, $this->route_key);
	}
	
	/*
	 * 处理消息
	 * */
	public function deal_mq($callback)
	{
//		var_dump('处理消息');exit;
		// 交换机与队列绑定
		($this->channel)->queue_bind($this->queue_name,$this->exchange_name, $this->route_key);
		//prefetchSize：0
		//prefetchCount：会告诉RabbitMQ不要同时给一个消费者推送多于N个消息，即一旦有N个消息还没有ack，则该consumer将block掉，直到有消息ack
		//global：true\false 是否将上面设置应用于channel，简单点说，就是上面限制是channel级别的还是consumer级别
		//$this->channel->basic_qos(0, 1, false);
		//1:queue 要取得消息的队列名
		//2:consumer_tag 消费者标签
		//3:no_local false这个功能属于AMQP的标准,但是rabbitMQ并没有做实现.参考
		//4:no_ack  false收到消息后,是否不需要回复确认即被认为被消费
		//5:exclusive false排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
		//6:nowait  false不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
		//7:callback  null回调函数
		//8:ticket  null
		//9:arguments null
		($this->channel)->basic_consume($this->queue_name, '', false, $this->autoAck, false, false, $callback);
		//监听消息
		while(count($this->channel->callbacks)){
			($this->channel)->wait();
		}
	}
	
	/*
	 * 消息ack确认
	 * */
	public function msg_ack($msg)
	{
		//手动ack应答
		$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
	}
	
	/*
	 * 关闭连接
	 * */
	public function close_connetct()
	{
		$this->channel->close();
		$this->connection->close();
	}
	
	//重新设置MQ的链接配置
	public function setConfig($config)
	{
		if (!is_array($config))
		{
			throw new Exception('config不是一个数组');
		}
		foreach($config as $key => $value)
		{
			$this->config[$key] = $value;
		}
	}
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
}//